Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(file sink): Upgrade file sink for tokio-compat #1988

Merged
merged 30 commits into from
Mar 19, 2020

Conversation

MOZGIII
Copy link
Contributor

@MOZGIII MOZGIII commented Mar 5, 2020

This is the file sink upgraded for tokio-compat support. It's being developed atop of the #1922 and will be rebased to master when then branch is merged. For now, the base of the PR is set to the tokio-compat branch.

It relies heavily on the solution proposed by @LucioFranco at #1945.

I tried to preserve the tests as much as possible.

This PR also introduces the ExpiringHashMap - a component that's designed to be reusable. I decided to introduce it because I saw this logic implemented multiple times across the codebase. This should make it easier to add expiration functionality to the existing code where we just use HashMaps - for instance in places where we need per stream buffers, sockets, files and so on.

To do:

@MOZGIII MOZGIII force-pushed the tokio-compat-file-sink branch from 920a2d5 to 147b53c Compare March 5, 2020 01:47
@MOZGIII
Copy link
Contributor Author

MOZGIII commented Mar 5, 2020

Aaand just after I created a PR noticed a problem.

The sinks::file::tests::many_partitions now randomly fails.
Actually, it's not even random: RUST_BACKTRACE=1 TEST_LOG=trace cargo test sinks::file passes reliably, but cargo test fails reliably on the sinks::file::tests::many_partitions.
I this must be a race condition somewhere, but I don't quite get where. Something tells me it could be at streaming_sink.rs - where we do tokio02::spawn...

Copy link
Contributor

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LOOOKS great! Left some comments inline, I'd like to see how we could clean up the giant select statement. I think we should try to use the tokio select macro.

src/expiring_hash_map.rs Outdated Show resolved Hide resolved
src/expiring_hash_map.rs Outdated Show resolved Hide resolved
src/expiring_hash_map.rs Outdated Show resolved Hide resolved
mod bytes_path;
use bytes_path::BytesPath;

mod streaming_sink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably place this stuff in sinks/mod.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up going with it - definitely! I think we can extract bytes_path into global utils too. I'll do a final pass on the locations of things before the merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving it upper now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aborting, leaving it as-is for now, we'll correct it later.

src/sinks/file/mod.rs Outdated Show resolved Hide resolved
src/sinks/file/mod.rs Outdated Show resolved Hide resolved
src/sinks/file/mod.rs Outdated Show resolved Hide resolved
src/sinks/file/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking great, left some comments :)

src/sinks/file/mod.rs Outdated Show resolved Hide resolved
src/sinks/file/mod.rs Show resolved Hide resolved
src/sinks/file/mod.rs Outdated Show resolved Hide resolved
src/sinks/file/streaming_sink/compat.rs Outdated Show resolved Hide resolved
@@ -0,0 +1,83 @@
use super::StreamingSink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for moving this out of being on the streamingsink type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale is this code is more of a compatibility layer to the pre-async topology. This is partially outlined in the comments to the units inside the mod already. The idea is we can support this pull-based operation mode as a first-class at the topology layer, and it then we will be able to simply remove this mod. The StreamingSink type will remain though. For this reason, I think it's a good idea to separate them from the start by design.

If you are asking in regards to why use separate files - I just feel like it's cleaner to have multiple smaller files rather than a big one, especially if they have different concerns, like in this case.

Copy link
Contributor

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple comments about testing, happy to chat over slack about any deeper questions :)

src/expiring_hash_map/mod.rs Outdated Show resolved Hide resolved
src/expiring_hash_map/tests.rs Outdated Show resolved Hide resolved
src/expiring_hash_map/tests.rs Outdated Show resolved Hide resolved
rt.block_on_std(async {
let mut map = ExpiringHashMap::<String, String>::new();

let a_minute_ago = Instant::now() - Duration::from_secs(60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also be using the built in test-util functionality to test timers, refer to the tokio time tests where we pause the time and advance it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw those at the tokio repo, but, unfortunately, everything is pub(create) there. It would be nice to use that indeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What API's from clock do you need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally all of them. I haven't looked thoroughly, but I think I'll need a clock-based Instance override there, and the calls to freeze time (pause, advance, etc).

My idea is that instead of practically waiting, we should freeze time, then shift time into the future. This, in turn, should trigger timer - and then we poll again, and the test should pass.

There should be a better way than doing delays in tests, cause it's a problematic design for a whole lot of reasons. I'm already used to time freezing in more mature test ecosystems of other languages, so it really catches my attention. 😄

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty good to me! Seems like we'll be blocked on adding a runtime to SinkContext, but otherwise seems pretty close to mergeable?

@MOZGIII
Copy link
Contributor Author

MOZGIII commented Mar 11, 2020

We're also blocked by #1922.

In addition to that, there's a API change to the ExpiringHashMap that I forgot to push apparently. One part of that, that's still pending is the documentation for ExpiringHashMap. It's important because the new API lets easily shooting yourself in the foot and get a spinlock if not properly used.
We've discussed the approach to the API with @LucioFranco, and the consensus is that we want to offer this API for now - instead of a more limited but safer one - due to the fact that we can't anticipate all the possible use cases. Another reason to provide API in this particular form is that it allows us to implement poll_expired as the async fn: async fn poll_expired(...) -> Option<Result<...>> . One of the alternative signatures was fn poll_expired(...) -> Option<impl Future<Output = Result<...>>>, it was imo better communicating the API contract, but @LucioFranco found the arguments to convince me we should go with the async fn variant. I want to document this decision-making process too (considered design options, tradeoffs and the rationales that contributed to the final decision). To some degree.

@MOZGIII
Copy link
Contributor Author

MOZGIII commented Mar 12, 2020

@LucioFranco I've implemented the ExpiringHashMap API we settled on during the discussion.

Please re-review!

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still looking good to me.

/// # });
/// ```
pub async fn next_expired(&mut self) -> Option<Result<ExpiredItem<K, V>, Error>> {
let key = self.expiration_queue.next().await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the error here just a timer error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. This is a little bit of a bummer to expose since it's something we'd usually unwrap, but I'm fine with either as long as we make the nature of the possible error clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! I forgot to add the errors section 😦

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh this is odd I thought we removed errors for timers from tokio 0.2 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do timers just panic now?
It might make sense to return errors from timers IMO, but I'm not aware of how the development unfolds in tokio. 😄 Either way - it doesn't make sense to go ahead of the tokio changes. We'll just remove errors here if/when relevant tokio update arrives.

Copy link
Contributor

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good, left some comments inline.

use tokio02::time::{delay_queue, DelayQueue, Error};

#[cfg(test)]
mod tests;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not-minor nit: the file isn't big enough to split these can we just have the tests and the map in the same file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change it since you asked. However, I personally think this style of keeping tests mod in an adjacent file is easier to maintain.
I don't think I should have a strong opinion on how to organize files yet, since I'm still relatively new in this project, and haven't touched/seen the majority of the code. But these minor details usually matter to me. On this particular topic, I'll just say I like how I did it more. 😄

src/expiring_hash_map/mod.rs Outdated Show resolved Hide resolved
src/expiring_hash_map/mod.rs Outdated Show resolved Hide resolved
use tokio02 as tokio;
use tokio_test::{assert_pending, assert_ready, task};

fn unwrap_ready<T>(poll: Poll<T>) -> T {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like using https://docs.rs/tokio-test/0.2.0/tokio_test/macro.assert_ready_eq.html would work well here, happy to add more asserts to tokio-test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready eq requires specifying a type for expected Ok. I don't know how to work around that, but I'm happy to correct it if you show me the way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be working with result types? You can peek the internal impl here https://docs.rs/tokio-test/0.2.0/src/tokio_test/macros.rs.html#174 it just does basically what this does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do assert_eq!(unwrap_ready(fut.poll()).unwrap().unwrap().0, "val");
I can't find a way to do the same with assert_ready_eq.
For instance:

  • assert_ready_eq!(fut.unwrap().unwrap().0, "val"); - down't work, cause the first argument has to be the future;
  • assert_ready_ok!(fut.unwrap().0, "val"); - down't work either for the same reason;
  • assert_ready_ok!(fut, Some(("val", ?)); - I'd love to write it in this form, but I don't know the value of the Key here, so it doesn't work either.

I'd be happy to improve anything there. The solution I settled on works, but I'm not very pleased with it.

src/sinks/file/mod.rs Show resolved Hide resolved
src/sinks/file/mod.rs Outdated Show resolved Hide resolved
@@ -0,0 +1,66 @@
use super::StreamingSink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to just expose a struct that handles all of this instead of a bunch of functions for now. Lets try to keep it simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Something like struct Compat? Sorry, I don't understand what you're proposing here.

src/sinks/file/streaming_sink/mod.rs Outdated Show resolved Hide resolved
@MOZGIII MOZGIII force-pushed the tokio-compat branch 2 times, most recently from 09eea32 to 6cd58d6 Compare March 18, 2020 15:13
@MOZGIII MOZGIII force-pushed the tokio-compat-file-sink branch from 2ea0709 to e6428be Compare March 19, 2020 00:04
@MOZGIII MOZGIII changed the base branch from tokio-compat to master March 19, 2020 00:04
@github-actions
Copy link

github-actions bot commented Mar 19, 2020

Great PR! Please pay attention to the following items before merging:

Files matching Cargo.lock:

  • Has at least one other team member approved the dependency changes?

Files matching src/**:

  • For each failure path, is there sufficient context logged for users to investigate the issue?
  • Do the tests ensure that behavior is sane for inputs that don't meet normal assumptions (e.g. missing field, non-string, etc)?
  • Did you add adequate documentation?

This is an automatically generated QA checklist based on modified files

@MOZGIII MOZGIII marked this pull request as ready for review March 19, 2020 13:18
MOZGIII added 7 commits March 19, 2020 19:27
This reverts commit dd5c922.

Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
- switch StreamingSink to impl Stream - provides a significant boot to
  composability;
- simpler compat layer - this compat approach should be much easier to
  understand, it consists of a set of simple and documented components
  that are composed together in a meaningful way;
- switch tests to directly working with FileSink instead of futures
  compat layer - this build upon on StreamingSink taking impl Stream
  instead of mpsc::Receiver.

Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
MOZGIII added 23 commits March 19, 2020 19:27
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
…expired

Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
Signed-off-by: MOZGIII <mike-n@narod.ru>
@MOZGIII MOZGIII force-pushed the tokio-compat-file-sink branch from 1a5b49d to d5b8f44 Compare March 19, 2020 16:27
@MOZGIII
Copy link
Contributor Author

MOZGIII commented Mar 19, 2020

Merging this to unlock the hotfix for the console sink.

Pending questions to be solved after the merge.

@MOZGIII MOZGIII merged commit 9716e0b into master Mar 19, 2020
@binarylogic binarylogic deleted the tokio-compat-file-sink branch April 24, 2020 20:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants